javascript

您所在的位置:网站首页 js oncompleted多次 javascript

javascript

#javascript| 来源: 网络整理| 查看: 265

我正在尝试创建一个可观察对象,它从许多异步操作(来自 Jenkins 服务器的 http 请求)中生成值,一旦所有操作完成,它就会通知订阅者。我觉得我一定是误会了什么,因为这没有达到我的预期。

'use strict'; let Rx = require('rx'); let _ = require('lodash'); let values = [ {'id': 1, 'status': true}, {'id': 2, 'status': true}, {'id': 3, 'status': true} ]; function valuesObservable() { return Rx.Observable.create(function(observer) { _.map(values, function(value) { var millisecondsToWait = 1000; setTimeout(function() { // just using setTimeout here to construct the example console.log("Sending value: ", value); observer.onNext(value) }, millisecondsToWait); }); console.log("valuesObservable Sending onCompleted"); observer.onCompleted() }); } let observer = Rx.Observer.create((data) => { console.log("Received Data: ", data); // do something with the info }, (error) => { console.log("Error: ", error); }, () => { console.log("DONE!"); // do something else once done }); valuesObservable().subscribe(observer);

运行这个,我得到输出:

valuesObservable Sending onCompleted DONE! Sending value: { id: 1, status: true } Sending value: { id: 2, status: true } Sending value: { id: 3, status: true }

虽然我更希望看到的是:

Sending value: { id: 1, status: true } Received Data: { id: 1, status: true } Sending value: { id: 2, status: true } Received Data: { id: 2, status: true } Sending value: { id: 3, status: true } Received Data: { id: 3, status: true } valuesObservable Sending onCompleted DONE!

我实际上并不关心列表中项目的顺序,我只是希望观察者能够接收它们。

我相信正在发生的事情是 Javascript 异步触发超时函数,并立即继续到 observer.onCompleted() 行。一旦订阅观察者接收到 onCompleted 事件(这个词正确吗?),它就决定它已经完成并自行处理。然后,当异步操作完成并且可观察对象触发 onNext 时,观察者不再存在,无法对它们采取任何操作。

如果我对此是正确的,我仍然对如何让它按照我想要的方式运行感到困惑。我是否在没有意识到的情况下偶然发现了反模式?有没有更好的方法来处理这整件事?

编辑:

自从我使用 setTimeout 构建我的示例后,我意识到我可以通过给可观察对象一个超时来使用它来部分解决我的问题。

function valuesObservable() { return Rx.Observable.create(function(observer) { let observableTimeout = 10000; setTimeout(function() { console.log("valuesObservable Sending onCompleted"); observer.onCompleted(); }, observableTimeout); _.map(values, function(value) { let millisecondsToWait = 1000; setTimeout(function() { console.log("Sending value: ", value); observer.onNext(value) }, millisecondsToWait); }); }); }

这会按照我想要的顺序(数据,然后是完成)从可观察对象中获取所有信息,但是根据超时的选择,我可能会错过一些数据,或者必须等待很长时间才能完成事件.这只是我不得不忍受的异步编程的固有问题吗?

最佳答案

是的,有更好的方法。现在的问题是您依靠时间延迟来进行同步,而实际上您可以使用 Observable 运算符来代替。

第一步是放弃直接使用 setTimeout。而是使用 timer

Rx.Observable.timer(waitTime);

接下来,您可以提升值数组到一个 Observable 中,这样每个值都作为一个事件发出,方法是:

Rx.Observable.from(values);

最后,您将使用 flatMap 将这些值转换为 Observables 并将它们展平为最终序列。结果是一个 Observable,每次源 timers 发射时发射,并在所有源 Observable 完成时完成。

Rx.Observable.from(values) .flatMap( // Map the value into a stream value => Rx.Observable.timer(waitTime), // This function maps the value returned from the timer Observable // back into the original value you wanted to emit value => value )

因此完整的 valuesObservable 函数看起来像这样:

function valuesObservable(values) { return Rx.Observable.from(values) .flatMap( value => Rx.Observable.timer(waitTime), value => value ) .do( x => console.log(`Sending value: ${value}`), null, () => console.log('Sending values completed') ); }

请注意,如果您不使用演示流,上述内容也可以正常工作,即如果您真的有 http 流,您甚至可以使用 merge(或 concat)来简化以保持秩序)

Rx.Observable.from(streams) .flatMap(stream => stream); // OR Rx.Observable.from(streams).merge(); // Or simply Rx.Observable.mergeAll(streams);

关于javascript - RxJS Observable 在一些异步操作后触发 onCompleted,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39896230/



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3